INFO SparkEnv: Registering MapOutputTracker
MapOutputTrackerMaster — MapOutputTracker For Driver
MapOutputTrackerMaster is the MapOutputTracker for the driver.
A MapOutputTrackerMaster is the source of truth for the collection of MapStatus objects (map output locations) per shuffle id (as recorded from ShuffleMapTasks).
MapOutputTrackerMaster uses Java’s thread-safe java.util.concurrent.ConcurrentHashMap for mapStatuses internal cache.
|
Note
|
There is currently a hardcoded limit of map and reduce tasks above which Spark does not assign preferred locations aka locality preferences based on map output sizes — 1000 for map and reduce each.
|
It uses MetadataCleaner with MetadataCleanerType.MAP_OUTPUT_TRACKER as cleanerType and cleanup function to drop entries in mapStatuses.
You should see the following INFO message when the MapOutputTrackerMaster is created (FIXME it uses MapOutputTrackerMasterEndpoint):
| Name | Description |
|---|---|
Internal cache with…FIXME Used when…FIXME |
|
Internal registry with…FIXME Used when…FIXME |
|
Tip
|
Enable Add the following line to
Refer to Logging. |
getSerializedMapOutputStatuses Method
|
Caution
|
FIXME |
unregisterMapOutput Method
|
Caution
|
FIXME |
cleanup Function for MetadataCleaner
cleanup(cleanupTime: Long) method removes old entries in mapStatuses and cachedSerializedStatuses that have timestamp earlier than cleanupTime.
It uses org.apache.spark.util.TimeStampedHashMap.clearOldValues method.
|
Tip
|
Enable Add the following line to
|
You should see the following DEBUG message in the logs for entries being removed:
DEBUG Removing key [entry.getKey]
Finding Preferred BlockManagers with Most Map Outputs (for ShuffleDependency and Partition) — getPreferredLocationsForShuffle Method
getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], partitionId: Int): Seq[String]
getPreferredLocationsForShuffle finds the locations (i.e. BlockManagers) with the most map outputs for the input ShuffleDependency and Partition.
|
Note
|
getPreferredLocationsForShuffle is simply getLocationsWithLargestOutputs with a guard condition.
|
|
Note
|
A map output are shuffle blocks across BlockManagers. |
Internally, getPreferredLocationsForShuffle checks whether spark.shuffle.reduceLocality.enabled Spark property is enabled (it is by default) with the number of partitions of the RDD of the input ShuffleDependency and partitions in the partitioner of the input ShuffleDependency both being less than 1000.
|
Note
|
The thresholds for the number of partitions in the RDD and of the partitioner when computing the preferred locations are 1000 and are not configurable.
|
If the condition holds, getPreferredLocationsForShuffle finds locations with the largest number of shuffle map outputs for the input ShuffleDependency and partitionId (with the number of partitions in the partitioner of the input ShuffleDependency and 0.2) and returns the hosts of the preferred BlockManagers.
|
Note
|
0.2 is the fraction of total map output that must be at a location to be considered as a preferred location for a reduce task. It is not configurable.
|
|
Note
|
getPreferredLocationsForShuffle is used when ShuffledRDD and ShuffledRowRDD ask for preferred locations for a partition.
|
Incrementing Epoch — incrementEpoch Method
incrementEpoch(): Unit
incrementEpoch increments the internal epoch.
You should see the following DEBUG message in the logs:
DEBUG MapOutputTrackerMaster: Increasing epoch to [epoch]
|
Note
|
incrementEpoch is used when MapOutputTrackerMaster registers map outputs (with changeEpoch flag enabled — it is disabled by default) and unregisters map outputs (for a shuffle, mapper and block manager), and when DAGScheduler is notified that an executor got lost (with filesLost flag enabled).
|
Finding Locations with Largest Number of Shuffle Map Outputs — getLocationsWithLargestOutputs Method
getLocationsWithLargestOutputs(
shuffleId: Int,
reducerId: Int,
numReducers: Int,
fractionThreshold: Double): Option[Array[BlockManagerId]]
getLocationsWithLargestOutputs returns BlockManagerIds with the largest size (of all the shuffle blocks they manage) above the input fractionThreshold (given the total size of all the shuffle blocks for the shuffle across all BlockManagers).
|
Note
|
getLocationsWithLargestOutputs may return no BlockManagerId if their shuffle blocks do not total up above the input fractionThreshold.
|
|
Note
|
The input numReducers is not used.
|
Internally, getLocationsWithLargestOutputs queries the mapStatuses internal cache for the input shuffleId.
|
Note
|
One entry in |
getLocationsWithLargestOutputs iterates over the MapStatus array and builds an interim mapping between BlockManagerId and the cumulative sum of shuffle blocks across BlockManagers.
|
Note
|
getLocationsWithLargestOutputs is used exclusively when MapOutputTrackerMaster finds the preferred locations (BlockManagers and hence executors) for a shuffle.
|
Checking If Shuffle Map Output Is Tracked Already — containsShuffle Method
containsShuffle(shuffleId: Int): Boolean
containsShuffle checks if the input shuffleId is registered in the cachedSerializedStatuses or mapStatuses internal caches.
|
Note
|
containsShuffle is used exclusively when DAGScheduler creates a ShuffleMapStage (for ShuffleDependency and ActiveJob).
|
registerShuffle Method
registerShuffle(shuffleId: Int, numMaps: Int): Unit
registerShuffle registers the input shuffleId in the mapStatuses internal cache.
|
Note
|
The number of MapStatus entries in the new array in mapStatuses internal cache is exactly the input numMaps.
|
registerShuffle adds a lock in the shuffleIdLocks internal registry (without using it).
If the shuffleId has already been registered, registerShuffle throws a IllegalArgumentException with the following message:
Shuffle ID [id] registered twice
|
Note
|
registerShuffle is used exclusively when DAGScheduler creates a ShuffleMapStage (for ShuffleDependency and ActiveJob).
|
Registering Map Outputs for Shuffle (Possibly with Epoch Change) — registerMapOutputs Method
registerMapOutputs(
shuffleId: Int,
statuses: Array[MapStatus],
changeEpoch: Boolean = false): Unit
registerMapOutputs registers the input statuses (as the shuffle map output) with the input shuffleId in the mapStatuses internal cache.
registerMapOutputs increments epoch if the input changeEpoch is enabled (it is not by default).
|
Note
|
In both cases, the input |
Settings
| Spark Property | Default Value | Description |
|---|---|---|
|
Controls whether to compute locality preferences for reduce tasks. When enabled (i.e. |